package spark.example

/**
  * Created by jiangtao7 on 2018/3/19.
  */

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils

import scala.collection.immutable

/**
  * @author Administrator
  */
object HDFSWordCount {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("HDFSWordCount")
    val ssc = new StreamingContext(conf, Seconds(5))
    val kafkaParam = immutable.HashMap[String, String](("hello", "a"))
    val topic = immutable.Set[String]("test'")
    val inputDstream = KafkaUtils.createDirectStream(ssc, kafkaParam, topic)
    /**
      * 基于HDFS文件的数据源是没有Receiver的，因此不会占用一个cpu core
      */
    val lines = ssc.textFileStream("hdfs://spark1:9000/wordcount_dir")
    val words = lines.flatMap {
      _.split(" ")
    }
    val pairs = words.map { word => (word, 1) }
    val wordCounts = pairs.reduceByKey(_ + _)

    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }

}
